Median for RDDs, Datasets, and Dataframes

Getting spark up and running


In [1]:
classpath.add(
  "org.apache.spark" %% "spark-core" % "2.0.2",
  "org.apache.spark" %% "spark-sql" % "2.0.2",
  "org.apache.spark" %% "spark-mllib" % "2.0.2"
);


143 new artifact(s)
143 new artifacts in macro
143 new artifacts in runtime
143 new artifacts in compile


In [2]:
import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}


import org.apache.spark.sql.{SparkSession, DataFrame, Dataset}

In [3]:
val spark = SparkSession.builder().master("local[*]").getOrCreate()


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
17/08/03 23:24:04 INFO SparkContext: Running Spark version 2.0.2
17/08/03 23:24:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/08/03 23:24:04 INFO SecurityManager: Changing view acls to: amir.ziai
17/08/03 23:24:04 INFO SecurityManager: Changing modify acls to: amir.ziai
17/08/03 23:24:04 INFO SecurityManager: Changing view acls groups to: 
17/08/03 23:24:04 INFO SecurityManager: Changing modify acls groups to: 
17/08/03 23:24:04 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users  with view permissions: Set(amir.ziai); groups with view permissions: Set(); users  with modify permissions: Set(amir.ziai); groups with modify permissions: Set()
17/08/03 23:24:04 INFO Utils: Successfully started service 'sparkDriver' on port 53119.
17/08/03 23:24:04 INFO SparkEnv: Registering MapOutputTracker
17/08/03 23:24:04 INFO SparkEnv: Registering BlockManagerMaster
17/08/03 23:24:04 INFO DiskBlockManager: Created local directory at /private/var/folders/8n/xtn3n2c50tbgtcr2pnh21dl4002rn2/T/blockmgr-ba771fc8-e73c-4b14-a159-787d0bb2a583
17/08/03 23:24:04 INFO MemoryStore: MemoryStore started with capacity 2004.6 MB
17/08/03 23:24:04 INFO SparkEnv: Registering OutputCommitCoordinator
17/08/03 23:24:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
17/08/03 23:24:05 INFO Utils: Successfully started service 'SparkUI' on port 4041.
17/08/03 23:24:05 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://192.168.0.4:4041
17/08/03 23:24:05 INFO Executor: Starting executor ID driver on host localhost
17/08/03 23:24:05 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 53120.
17/08/03 23:24:05 INFO NettyBlockTransferService: Server created on 192.168.0.4:53120
17/08/03 23:24:05 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver, 192.168.0.4, 53120)
17/08/03 23:24:05 INFO BlockManagerMasterEndpoint: Registering block manager 192.168.0.4:53120 with 2004.6 MB RAM, BlockManagerId(driver, 192.168.0.4, 53120)
17/08/03 23:24:05 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver, 192.168.0.4, 53120)
17/08/03 23:24:05 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
17/08/03 23:24:05 INFO SharedState: Warehouse path is 'file:/Users/amir.ziai/Dropbox/jupyter/spark-warehouse'.
spark: SparkSession = org.apache.spark.sql.SparkSession@6bdd6f2a

In [4]:
import spark.implicits._


import spark.implicits._

Creating a Dataset[Double]


In [20]:
val ds1 = spark.createDataset(Seq(1)).map(_.toDouble)
val ds2 = spark.createDataset(Seq(1, 2)).map(_.toDouble)
val ds3 = spark.createDataset(Seq(1, 2, 3)).map(_.toDouble)
val ds4 = spark.createDataset(Seq(1, 2, 3, 4)).map(_.toDouble)
val ds5 = spark.createDataset(Seq(1, 2, 3, 4, 5)).map(_.toDouble)


ds1: Dataset[Double] = [value: double]
ds2: Dataset[Double] = [value: double]
ds3: Dataset[Double] = [value: double]
ds4: Dataset[Double] = [value: double]
ds5: Dataset[Double] = [value: double]

Dataset with odd number of elements


In [14]:
val Array(median) = ds5.stat.approxQuantile("value",
                                            Array(0.5),
                                            relativeError = 0.1)


median: Double = 3.0

This is strange to me. My understanding is that relativeError=0 is supposed to result in an exact median calculation. I will have to look into this further.


In [15]:
val Array(median) = ds5.stat.approxQuantile("value",
                                            Array(0.5),
                                            relativeError = 0)


median: Double = 4.0

Dataset with even number of elements


In [21]:
val Array(median) = ds4.stat.approxQuantile("value",
                                            Array(0.5),
                                            relativeError = 0.1)


median: Double = 2.0

In [22]:
val Array(median) = ds4.stat.approxQuantile("value",
                                            Array(0.5),
                                            relativeError = 0)


median: Double = 3.0

Dataset of 1 element


In [16]:
val Array(median) = ds1.stat.approxQuantile("value",
                                            Array(0.5),
                                            relativeError = 0.1)


median: Double = 1.0

In [17]:
val Array(median) = ds1.stat.approxQuantile("value",
                                            Array(0.5),
                                            relativeError = 0)


median: Double = 1.0

Exact median calculation with RDDs

This is not an efficient implementation but it works.


In [28]:
import org.apache.spark.sql.Dataset


import org.apache.spark.sql.Dataset

In [42]:
def median(ds: Dataset[Double], column: String = "value"): Double = {
    // Order the dataset
    val dsOrdered = ds.orderBy(column)
    val count = ds.count()
    val dsDouble = dsOrdered.select(column).as[Double]
    
    // Zip the Dataset with index so we can lookup 
    // values by index
    val dsWithIndex = dsDouble.rdd.zipWithIndex()
    if (count % 2 == 0) {
      val left = dsWithIndex
        .filter(_._2 == count / 2 - 1)
        .collect()(0)._1
      val right = dsWithIndex
        .filter(_._2 == count / 2)
        .collect()(0)._1
      (left + right) / 2
    } else {
      dsWithIndex.
        filter(_._2 == count / 2)
        .collect()(0)._1
    }
}


defined function median

In [38]:
median(ds5)


res37: Double = 3.0

In [39]:
median(ds4)


res38: Double = 2.5

In [40]:
median(ds1)


res39: Double = 1.0

In [41]:
median(ds2)


res40: Double = 1.5